{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 前言\n",
    "\n",
    "我们都知道 python 中可以是 `threading` 模块实现多线程, 但是模块并没有提供暂停, 恢复和停止线程的方法, 一旦线程对象调用 `start` 方法后, 只能等到对应的方法函数运行完毕. 也就是说一旦 `start` 后, 线程就属于失控状态. 对于函数中没有循环，可以使用`join()`来结束循环。\n",
    "\n",
    "不过, 我们可以自己实现这些. 一般的方法就是循环地判断一个标志位, 一旦标志位到达到预定的值, 就退出循环. 这样就能做到退出线程了\n",
    "\n",
    "## 方法一：Event\n",
    "\n",
    "- 文档说明：Event.wait(timeout=None)\n",
    "\n",
    "```\n",
    "\n",
    "    Block until the internal flag is true. If the internal flag is true on entry, return immediately. Otherwise, block until another thread calls set() to set the flag to true, or until the optional timeout occurs.\n",
    "\n",
    "    阻塞, 直到内部的标志位为True时. 如果在内部的标志位在进入时为True时, 立即返回. 否则, 阻塞直到其他线程调用`set()`方法将标志位设为True, 或者到达了可选参数的timeout时间.\n",
    "\n",
    "\n",
    "    When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).\n",
    "\n",
    "    This method returns the internal flag on exit, so it will always return True except if a timeout is given and the operation times out.\n",
    "\n",
    "    当给定了timeout参数且不为None, 它应该是以秒为单位的浮点数，指定操作的超时时间（或是分数）。\n",
    "\n",
    "    此方法在退出时返回内部标志，因此除非给定了超时参数且操作确实超时了，否则它将始终返回True。\n",
    "\n",
    "\n",
    "    Changed in version 2.7: Previously, the method always returned None.\n",
    "\n",
    "    2.7版本以前, 这个方法总会返回None.\n",
    "    \n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 实现原理\n",
    "\n",
    "利用`wait` 的阻塞机制, 就能够实现暂停和恢复了, 再配合循环判断标识位, 就能实现退出了, 下面是代码示例:\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# !/usr/bin/env python\n",
    "# coding: utf-8\n",
    "\n",
    "import threading\n",
    "import time\n",
    "\n",
    "\n",
    "class Job(threading.Thread):\n",
    "    def __init__(self, *args, **kwargs):\n",
    "        super(Job, self).__init__(*args, **kwargs)\n",
    "        self.__flag = threading.Event()  # 用于暂停线程的标识\n",
    "        self.__flag.set()  # 设置为True\n",
    "        self.__running = threading.Event()  # 用于停止线程的标识\n",
    "        self.__running.set()  # 将running设置为True\n",
    "\n",
    "    def run(self):\n",
    "        while self.__running.isSet():\n",
    "            self.__flag.wait()  # 为True时立即返回, 为False时阻塞直到内部的标识位为True后返回\n",
    "            print(int(time.time()))\n",
    "            time.sleep(1)\n",
    "\n",
    "    def pause(self):\n",
    "        self.__flag.clear()  # 设置为False, 让线程阻塞\n",
    "\n",
    "    def resume(self):\n",
    "        self.__flag.set()  # 设置为True, 让线程停止阻塞\n",
    "\n",
    "    def stop(self):\n",
    "        self.__running.clear()  # 设置为 False\n",
    "        self.__flag.set()  # 将线程从暂停状态恢复, 如果已经暂停的话"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1590401672\n",
      "1590401673\n",
      "1590401674\n",
      "1590401678\n",
      "1590401679\n",
      "1590401680\n",
      "1590401683\n"
     ]
    }
   ],
   "source": [
    "# 下面是测试代码:\n",
    "a = Job()\n",
    "a.start()\n",
    "time.sleep(3)\n",
    "a.pause()\n",
    "time.sleep(3)\n",
    "a.resume()\n",
    "time.sleep(3)\n",
    "a.pause()\n",
    "time.sleep(2)\n",
    "a.stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 说明\n",
    "\n",
    "这完成了暂停, 恢复和停止的功能. 但是这里有一个缺点:   \n",
    "无论是暂停还是停止, 都**不是瞬时的**, 必须等待 `run` 函数内部的运行到达标志位判断时才有效. 也就是说操作会滞后一次.\n",
    "\n",
    "但是这有时也不一定是坏事. 如果 `run` 函数中涉及了文件操作或数据库操作等, 完整地运行一次后再退出, 反而能够执行剩余的资源释放操作的代码 (例如各种 close). 不会出现程序的文件操作符超出上限, 数据库连接未释放等尴尬的情况."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 方法二：信号量Sempaphore\n",
    "\n",
    "提供`acquire`方法和`release`方法，每当调用`acquire`方法的时候，如果内部计数器大于0，则将其减1，如果内部计数器等于0，则会阻塞该线程，知道有线程调用了`release`方法将内部计数器更新到大于1位置。\n",
    "\n",
    "```py\n",
    "class threading.Sempaphore(value = 1)\n",
    "```\n",
    "semaphore 是一个内部的计数器,它会随着`acquire()`的请求减小,也会随着`release()`的释放增加。这个计数器的值不会小于零,当`acquier()` 发现计数器的值为0时,那么当前对象会等待直到其他对象`release()`为止。\n",
    "\n",
    "实现参考上一种方法：\n",
    "- acquier(blocking = True ,timeout = None)  \n",
    "代替 `Event.clear()` 方法\n",
    "- release()  \n",
    "代替 `Event.set()` 方法\n",
    "\n",
    "### 说明\n",
    "\n",
    "在高并发环境中反复调用 `acquier()`和 `release()` 会产生的锁资源开销比上一种方法大，不如上一种方法好用。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 强行终止线程\n",
    "\n",
    "根据线程 id 强行终止线程"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "-------\n",
      "-------\n",
      "-------\n",
      "-------\n",
      "-------\n",
      "-------\n",
      "main thread sleep finish\n"
     ]
    }
   ],
   "source": [
    "import threading\n",
    "import time\n",
    "import inspect\n",
    "import ctypes\n",
    "\n",
    "\n",
    "def _async_raise(tid: int, exctype):\n",
    "    \"\"\"raises the exception, performs cleanup if needed\"\"\"\n",
    "    tid = ctypes.c_long(tid)\n",
    "    if not inspect.isclass(exctype):\n",
    "        exctype = type(exctype)\n",
    "    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))\n",
    "    if res == 0:\n",
    "        raise ValueError(\"Invalid thread id\")\n",
    "    elif res != 1:\n",
    "        # if it returns a number greater than one, you're in trouble,\n",
    "        # and you should call it again with exc=NULL to revert the effect\n",
    "        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)\n",
    "        raise SystemError(\"PyThreadState_SetAsyncExc failed\")\n",
    "\n",
    "\n",
    "def stop_thread(thread):\n",
    "    _async_raise(thread.ident, SystemExit)\n",
    "\n",
    "\n",
    "def test():\n",
    "    while True:\n",
    "        print('-------')\n",
    "        time.sleep(1)\n",
    "\n",
    "\n",
    "if __name__ == \"__main__\":\n",
    "    t = threading.Thread(target=test)\n",
    "    t.start()\n",
    "    time.sleep(5.2)\n",
    "    print(\"main thread sleep finish\")\n",
    "    stop_thread(t)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {
    "height": "calc(100% - 180px)",
    "left": "10px",
    "top": "150px",
    "width": "188.352px"
   },
   "toc_section_display": true,
   "toc_window_display": true
  },
  "varInspector": {
   "cols": {
    "lenName": 16,
    "lenType": 16,
    "lenVar": 40
   },
   "kernels_config": {
    "python": {
     "delete_cmd_postfix": "",
     "delete_cmd_prefix": "del ",
     "library": "var_list.py",
     "varRefreshCmd": "print(var_dic_list())"
    },
    "r": {
     "delete_cmd_postfix": ") ",
     "delete_cmd_prefix": "rm(",
     "library": "var_list.r",
     "varRefreshCmd": "cat(var_dic_list()) "
    }
   },
   "types_to_exclude": [
    "module",
    "function",
    "builtin_function_or_method",
    "instance",
    "_Feature"
   ],
   "window_display": false
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
